{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Database created successfully!\n"
     ]
    }
   ],
   "source": [
    "# Psycopg2 or SQLITE are both supported.\n",
    "import psycopg2\n",
    "from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT\n",
    "\n",
    "# Connection parameters\n",
    "dbname = \"xxx\"  \n",
    "user = \"xxx\"\n",
    "password = \"xxx\" \n",
    "host = \"xxx\"\n",
    "\n",
    "# Connect to PostgreSQL server\n",
    "conn = psycopg2.connect(dbname=dbname, user=user, password=password, host=host)\n",
    "conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)  \n",
    "\n",
    "# Cursor to execute SQL command\n",
    "cursor = conn.cursor()\n",
    "\n",
    "# SQL command to create database\n",
    "cursor.execute(\"CREATE DATABASE bluesky_backfill_new;\")\n",
    "print(\"Database created successfully!\")\n",
    "\n",
    "# Clean up\n",
    "cursor.close()\n",
    "conn.close()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Overwriting data_processing.py\n"
     ]
    }
   ],
   "source": [
    "%%file data_processing.py\n",
    "import os\n",
    "import pandas as pd\n",
    "import psycopg2\n",
    "import json\n",
    "import gzip\n",
    "from collections import Counter\n",
    "from tqdm import tqdm\n",
    "from psycopg2.extras import execute_values\n",
    "import numpy as np\n",
    "from multiprocessing import Pool\n",
    "\n",
    "\n",
    "def feed_generator(x):\n",
    "    df = pd.DataFrame(x[\"app.bsky.feed.generator\"])\n",
    "    \n",
    "    df[\"skyfeedBuilder_id_input\"] = df[\"skyfeedBuilder\"].apply(\n",
    "        lambda x: apply_if_type(x, lambda y: \", \".join([z[\"id\"] for z in y[\"blocks\"] if z[\"type\"] == \"input\"]))\n",
    "    )\n",
    "    df[\"skyfeedBuilder_id_regex\"] = df[\"skyfeedBuilder\"].apply(\n",
    "        lambda x: apply_if_type(x, lambda y: \", \".join([z[\"id\"] for z in y[\"blocks\"] if z[\"type\"] == \"regex\"]))\n",
    "    )\n",
    "    df[\"skyfeedBuilder_id_remove\"] = df[\"skyfeedBuilder\"].apply(\n",
    "        lambda x: apply_if_type(x, lambda y: \", \".join([z[\"id\"] for z in y[\"blocks\"] if z[\"type\"] == \"remove\"]))\n",
    "    )\n",
    "    df[\"skyfeedBuilder_id_sort\"] = df[\"skyfeedBuilder\"].apply(\n",
    "        lambda x: apply_if_type(x, lambda y: \", \".join([z[\"id\"] for z in y[\"blocks\"] if z[\"type\"] == \"sort\"]))\n",
    "    )\n",
    "    df[\"skyfeedBuilder_value_regex\"] = df[\"skyfeedBuilder\"].apply(\n",
    "        lambda x: apply_if_type(x, lambda y: \", \".join([z[\"value\"] for z in y[\"blocks\"] if z[\"type\"] == \"regex\"]))\n",
    "    )\n",
    "    df[\"skyfeedBuilder_sort_type\"] = df[\"skyfeedBuilder\"].apply(\n",
    "        lambda x: apply_if_type(x, lambda y: \", \".join([z[\"sortType\"] for z in y[\"blocks\"] if z[\"type\"] == \"sort\"]))\n",
    "    )\n",
    "    \n",
    "    df['createdAt'] = pd.to_datetime(df['createdAt'], utc=True, errors='coerce')\n",
    "    df['createdAt'] = df['createdAt'].where(df['createdAt'].notnull(), None)\n",
    "    df['createdAt'] = df['createdAt'].apply(lambda x: x.strftime(\"%Y-%m-%d %H:%M:%S\") if not pd.isna(x) else None)\n",
    "\n",
    "    df = df.loc[:,[\"cid_entry\",\"did\", \"createdAt\", \"description\", \"displayName\", \"skyfeedBuilder_id_input\", \"skyfeedBuilder_id_regex\", \"skyfeedBuilder_id_remove\", \"skyfeedBuilder_id_sort\", \"skyfeedBuilder_value_regex\", \"skyfeedBuilder_sort_type\"]]\n",
    "    return df\n",
    "\n",
    "def apply_if_type(value, func, default=''):\n",
    "    if isinstance(value, float) and np.isnan(value):\n",
    "        return default\n",
    "    try:\n",
    "        return func(value)\n",
    "    except (TypeError, KeyError, AttributeError) as e:\n",
    "        #print(f\"Error: {e}\")\n",
    "        return default\n",
    "    \n",
    "\n",
    "def profiles(x):\n",
    "    df = pd.DataFrame(x[\"app.bsky.actor.profile\"]).loc[:,[\"cid_entry\",'description', 'displayName', 'did', 'createdAt']]\n",
    "    df['createdAt'] = pd.to_datetime(df['createdAt'], utc=True, errors='coerce')\n",
    "    df['createdAt'] = df['createdAt'].where(df['createdAt'].notnull(), None)\n",
    "    df['createdAt'] = df['createdAt'].apply(lambda x: x.strftime(\"%Y-%m-%d %H:%M:%S\") if not pd.isna(x) else None)\n",
    "    return df\n",
    "\n",
    "def follows(x):\n",
    "    df = pd.DataFrame(x[\"app.bsky.graph.follow\"]).loc[:,[\"cid_entry\",'subject', 'did', 'createdAt']]\n",
    "    df['createdAt'] = pd.to_datetime(df['createdAt'], utc=True, errors='coerce')\n",
    "    df['createdAt'] = df['createdAt'].where(df['createdAt'].notnull(), None)\n",
    "    df['createdAt'] = df['createdAt'].apply(lambda x: x.strftime(\"%Y-%m-%d %H:%M:%S\") if not pd.isna(x) else None)\n",
    "    return df\n",
    "\n",
    "def likes(x):\n",
    "    df = pd.DataFrame(x[\"app.bsky.feed.like\"])\n",
    "    df[\"subject_cid\"] = df[\"subject\"].apply(lambda x: apply_if_type(x, lambda y: y.get(\"cid\")))\n",
    "    df[\"subject_uri\"] = df[\"subject\"].apply(lambda x: apply_if_type(x, lambda y: y.get(\"uri\")))\n",
    "    df[\"subject\"] = df[\"subject\"].apply(lambda x: apply_if_type(x, lambda y: y[\"uri\"].split(\"/\")[2]))\n",
    "    df = df.loc[:,[\"cid_entry\",\"did\", \"subject\", \"subject_cid\", \"subject_uri\", \"createdAt\"]]\n",
    "    df['createdAt'] = pd.to_datetime(df['createdAt'], utc=True, errors='coerce')\n",
    "    df['createdAt'] = df['createdAt'].where(df['createdAt'].notnull(), None)\n",
    "    df['createdAt'] = df['createdAt'].apply(lambda x: x.strftime(\"%Y-%m-%d %H:%M:%S\") if not pd.isna(x) else None)\n",
    "    return df\n",
    "\n",
    "def reposts(x):\n",
    "    df = pd.DataFrame(x[\"app.bsky.feed.repost\"])\n",
    "    df[\"subject_cid\"] = df[\"subject\"].apply(lambda x: apply_if_type(x, lambda y: y.get(\"cid\")))\n",
    "    df[\"subject_uri\"] = df[\"subject\"].apply(lambda x: apply_if_type(x, lambda y: y.get(\"uri\")))\n",
    "    df[\"subject\"] = df[\"subject\"].apply(lambda x: apply_if_type(x, lambda y: y[\"uri\"].split(\"/\")[2]))\n",
    "    df = df.loc[:,[\"cid_entry\",\"did\", \"subject\", \"subject_cid\", \"subject_uri\", \"createdAt\"]]\n",
    "    df['createdAt'] = pd.to_datetime(df['createdAt'], utc=True, errors='coerce')\n",
    "    df['createdAt'] = df['createdAt'].where(df['createdAt'].notnull(), None)\n",
    "    df['createdAt'] = df['createdAt'].apply(lambda x: x.strftime(\"%Y-%m-%d %H:%M:%S\") if not pd.isna(x) else None)\n",
    "    return df\n",
    "\n",
    "def blocks(x):\n",
    "    df = pd.DataFrame(x[\"app.bsky.graph.block\"])\n",
    "    df['createdAt'] = pd.to_datetime(df['createdAt'], utc=True, errors='coerce')\n",
    "    df['createdAt'] = df['createdAt'].where(df['createdAt'].notnull(), None)\n",
    "    df['createdAt'] = df['createdAt'].apply(lambda x: x.strftime(\"%Y-%m-%d %H:%M:%S\") if not pd.isna(x) else None)\n",
    "\n",
    "    df = df.loc[:,[\"cid_entry\",\"subject\", \"did\", \"createdAt\"]]\n",
    "    return df\n",
    "\n",
    "def listblock(x):\n",
    "    df = pd.DataFrame(x[\"app.bsky.graph.listblock\"])\n",
    "    df['createdAt'] = pd.to_datetime(df['createdAt'], utc=True, errors='coerce')\n",
    "    df['createdAt'] = df['createdAt'].where(df['createdAt'].notnull(), None)\n",
    "    df['createdAt'] = df['createdAt'].apply(lambda x: x.strftime(\"%Y-%m-%d %H:%M:%S\") if not pd.isna(x) else None)\n",
    "    return df.loc[:,[\"cid_entry\",\"subject\",\"createdAt\",\"did\"]]\n",
    "\n",
    "def create_list(data):\n",
    "    df = pd.DataFrame(data[\"app.bsky.graph.list\"])\n",
    "    df['createdAt'] = pd.to_datetime(df['createdAt'], utc=True, errors='coerce')\n",
    "    df['createdAt'] = df['createdAt'].where(df['createdAt'].notnull(), None)\n",
    "    df['createdAt'] = df['createdAt'].apply(lambda x: x.strftime(\"%Y-%m-%d %H:%M:%S\") if not pd.isna(x) else None)\n",
    "    df = df.loc[:,[\"cid_entry\",\"purpose\",\"description\",\"did\",\"createdAt\"]]\n",
    "    return df\n",
    "\n",
    "def listitem(x):\n",
    "    df = pd.DataFrame(x[\"app.bsky.graph.listitem\"])\n",
    "    df['createdAt'] = pd.to_datetime(df['createdAt'], utc=True, errors='coerce')\n",
    "    df['createdAt'] = df['createdAt'].where(df['createdAt'].notnull(), None)\n",
    "    df['createdAt'] = df['createdAt'].apply(lambda x: x.strftime(\"%Y-%m-%d %H:%M:%S\") if not pd.isna(x) else None)\n",
    "    df = df.loc[:,[\"cid_entry\",\"list\",\"subject\",\"createdAt\",\"did\"]]\n",
    "    return df\n",
    "\n",
    "\n",
    "def get_quote_info(x):\n",
    "    if isinstance(x, dict):\n",
    "        if x.get(\"$type\") == \"app.bsky.embed.record\":\n",
    "            return x.get(\"record\", {}).get(\"cid\", None), x.get(\"record\", {}).get(\"uri\", None)\n",
    "        elif x.get(\"$type\") == \"app.bsky.embed.recordWithMedia\":\n",
    "            return x.get(\"record\", {}).get(\"record\", {}).get(\"cid\", None), x.get(\"record\", {}).get(\"record\", {}).get(\"uri\", None)\n",
    "    return None, None\n",
    "\n",
    "def threadgate(x):\n",
    "    df = pd.DataFrame(x[\"app.bsky.feed.threadgate\"])\n",
    "    df[\"rule_type\"] = df[\"allow\"].apply(lambda x: apply_if_type(x, lambda y: y[0].get(\"$type\") if isinstance(y, list) and len(y) > 0 else None))\n",
    "    df['createdAt'] = pd.to_datetime(df['createdAt'], utc=True, errors='coerce')\n",
    "    df['createdAt'] = df['createdAt'].where(df['createdAt'].notnull(), None)\n",
    "    df['createdAt'] = df['createdAt'].apply(lambda x: x.strftime(\"%Y-%m-%d %H:%M:%S\") if not pd.isna(x) else None)\n",
    "    df = df.loc[:,[\"cid_entry\",\"post\",\"rule_type\", \"createdAt\", \"did\"]]\n",
    "    return df\n",
    "\n",
    "def posts(x):\n",
    "    df = pd.DataFrame(x[\"app.bsky.feed.post\"])\n",
    "    #Error uploading data to posts: A string literal cannot contain NUL (0x00) characters.\n",
    "    df[\"text\"] = df[\"text\"].apply(lambda x: x.replace(\"\\x00\", \"\") if isinstance(x, str) else x)\n",
    "    \n",
    "    df[\"has_image\"] = df[\"embed\"].apply(lambda x: apply_if_type(x, lambda y: True if isinstance(y, dict) and (y[\"$type\"] == \"app.bsky.embed.images\" or y[\"$type\"] == \"app.bsky.embed.recordWithMedia\") else False))\n",
    "    df[\"has_image\"] = df[\"has_image\"].astype(bool)\n",
    "\n",
    "    df[\"link\"] = df.embed.apply(lambda x: apply_if_type(x, lambda y: y.get(\"external\", {}).get(\"uri\") if isinstance(y, dict) and y.get(\"$type\") == \"app.bsky.embed.external\" else None))\n",
    "    \n",
    "    df[\"quote_cid\"], df[\"quote_uri\"] = zip(*df.embed.apply(get_quote_info))\n",
    "    \n",
    "    df[\"quote_did\"] = df[\"quote_uri\"].apply(lambda x: apply_if_type(x, lambda y: y.split(\"/\")[2] if isinstance(y, str) and len(y.split(\"/\")) > 2 else None, default=None))\n",
    "    \n",
    "    df[\"reply_root_cid\"] = df[\"reply\"].apply(lambda x: apply_if_type(x, lambda y: y.get(\"root\", {}).get(\"cid\")))\n",
    "    \n",
    "    df[\"reply_root_uri\"] = df[\"reply\"].apply(lambda x: apply_if_type(x, lambda y: y.get(\"root\", {}).get(\"uri\")))\n",
    "    \n",
    "    df[\"reply_root_did\"] = df[\"reply_root_uri\"].apply(lambda x: apply_if_type(x, lambda y: y.split(\"/\")[2] if isinstance(y, str) and len(y.split(\"/\")) > 2 else None, default=None))\n",
    "    \n",
    "    df[\"reply_parent_cid\"] = df[\"reply\"].apply(lambda x: apply_if_type(x, lambda y: y.get(\"parent\", {}).get(\"cid\")))\n",
    "    \n",
    "    df[\"reply_parent_uri\"] = df[\"reply\"].apply(lambda x: apply_if_type(x, lambda y: y.get(\"parent\", {}).get(\"uri\")))\n",
    "    \n",
    "    df[\"reply_parent_did\"] = df[\"reply_parent_uri\"].apply(lambda x: apply_if_type(x, lambda y: y.split(\"/\")[2] if isinstance(y, str) and len(y.split(\"/\")) > 2 else None, default=None))\n",
    "    \n",
    "    df['createdAt'] = pd.to_datetime(df['createdAt'], utc=True, errors='coerce')\n",
    "    df['createdAt'] = df['createdAt'].where(df['createdAt'].notnull(), None)\n",
    "    df['createdAt'] = df['createdAt'].apply(lambda x: x.strftime(\"%Y-%m-%d %H:%M:%S\") if not pd.isna(x) else None)\n",
    "    df[\"langs\"] = df[\"langs\"].apply(lambda x: apply_if_type(x, lambda y: \", \".join(y) if isinstance(y, list) else None))\n",
    "    \n",
    "    df = df.loc[:, [\"cid_entry\", \"text\", \"langs\", \"createdAt\", \"did\", \"has_image\", \"link\", \"quote_cid\", \"quote_uri\", \"quote_did\", \"reply_root_cid\", \"reply_root_uri\", \"reply_root_did\", \"reply_parent_cid\", \"reply_parent_uri\", \"reply_parent_did\"]]\n",
    "    return df\n",
    "    \n",
    "\n",
    "\n",
    "queries = [\n",
    "    \"\"\"CREATE TABLE IF NOT EXISTS profiles (\n",
    "        cid_entry text,\n",
    "        did text,\n",
    "        description text,\n",
    "        displayName text,\n",
    "        createdAt timestamp\n",
    "    );\"\"\",\n",
    "    \n",
    "    \"\"\"CREATE TABLE IF NOT EXISTS follows (\n",
    "        cid_entry text,\n",
    "        subject text,\n",
    "        did text,\n",
    "        createdAt timestamp\n",
    "    );\"\"\",\n",
    "    \n",
    "    \"\"\"CREATE TABLE IF NOT EXISTS likes (\n",
    "        cid_entry text,\n",
    "        did text,\n",
    "        subject text,\n",
    "        subject_cid text,\n",
    "        subject_uri text,\n",
    "        createdAt timestamp\n",
    "    );\"\"\",\n",
    "    \n",
    "    \"\"\"CREATE TABLE IF NOT EXISTS reposts (\n",
    "        cid_entry text,\n",
    "        did text,\n",
    "        subject text,\n",
    "        subject_cid text,\n",
    "        subject_uri text,\n",
    "        createdAt timestamp\n",
    "    );\"\"\",\n",
    "    \n",
    "    \"\"\"CREATE TABLE IF NOT EXISTS blocks (\n",
    "        cid_entry text,\n",
    "        subject text,\n",
    "        did text,\n",
    "        createdAt timestamp\n",
    "    );\"\"\",\n",
    "    \n",
    "    \"\"\"CREATE TABLE IF NOT EXISTS listblock (\n",
    "        cid_entry text,\n",
    "        subject text,\n",
    "        createdAt timestamp,\n",
    "        did text\n",
    "    );\"\"\",\n",
    "    \n",
    "    \"\"\"CREATE TABLE IF NOT EXISTS create_list (\n",
    "        cid_entry text,\n",
    "        purpose text,\n",
    "        description text,\n",
    "        did text,\n",
    "        createdAt timestamp\n",
    "    );\"\"\",\n",
    "    \n",
    "    \"\"\"CREATE TABLE IF NOT EXISTS listitem (\n",
    "        cid_entry text,\n",
    "        list text,\n",
    "        subject text,\n",
    "        createdAt timestamp,\n",
    "        did text\n",
    "    );\"\"\",\n",
    "    \n",
    "    \"\"\"CREATE TABLE IF NOT EXISTS threadgate (\n",
    "        cid_entry text,\n",
    "        post text,\n",
    "        rule_type text,\n",
    "        createdAt timestamp,\n",
    "        did text\n",
    "    );\"\"\",\n",
    "    \n",
    "    \"\"\"CREATE TABLE IF NOT EXISTS feed_generator (\n",
    "        cid_entry text,\n",
    "        did text,\n",
    "        createdAt timestamp,\n",
    "        description text,\n",
    "        displayName text,\n",
    "        skyfeedBuilder_id_input text,\n",
    "        skyfeedBuilder_id_regex text,\n",
    "        skyfeedBuilder_id_remove text,\n",
    "        skyfeedBuilder_id_sort text,\n",
    "        skyfeedBuilder_value_regex text,\n",
    "        skyfeedBuilder_sort_type text\n",
    "    );\"\"\",\n",
    "    \n",
    "    \"\"\"CREATE TABLE IF NOT EXISTS posts (\n",
    "        cid_entry text,\n",
    "        text text,\n",
    "        langs text,\n",
    "        createdAt timestamp,\n",
    "        did text,\n",
    "        has_image boolean,\n",
    "        link text,\n",
    "        quote_cid text,\n",
    "        quote_uri text,\n",
    "        quote_did text,\n",
    "        reply_root_cid text,\n",
    "        reply_root_uri text,\n",
    "        reply_root_did text,\n",
    "        reply_parent_cid text,\n",
    "        reply_parent_uri text,\n",
    "        reply_parent_did text\n",
    "    );\"\"\"\n",
    "]\n",
    "\n",
    "def upload_data_to_table(data, table_name, conn):\n",
    "    # Define the expected data types for each column based on the table schema\n",
    "    expected_dtypes = {\n",
    "        \"profiles\": {\"cid_entry\":str, \"did\": str, \"description\": str, \"displayName\": str, \"createdAt\": str},\n",
    "        \"follows\": {\"cid_entry\":str, \"subject\": str, \"did\": str, \"createdAt\": str},\n",
    "        \"likes\": {\"cid_entry\":str, \"did\": str, \"subject\": str, \"subject_cid\": str, \"subject_uri\": str, \"createdAt\": str},\n",
    "        \"reposts\": {\"cid_entry\":str, \"did\": str, \"subject\": str, \"subject_cid\": str, \"subject_uri\": str, \"createdAt\": str},\n",
    "        \"blocks\": {\"cid_entry\":str, \"subject\": str, \"did\": str, \"createdAt\": str},\n",
    "        \"listblock\": {\"cid_entry\":str, \"subject\": str, \"createdAt\": str, \"did\": str},\n",
    "        \"create_list\": {\"cid_entry\":str, \"purpose\": str, \"description\": str, \"did\": str, \"createdAt\": str},\n",
    "        \"listitem\": {\"cid_entry\":str, \"list\": str, \"subject\": str, \"createdAt\": str, \"did\": str},\n",
    "        \"threadgate\": {\"cid_entry\":str, \"post\": str, \"rule_type\": str, \"createdAt\": str, \"did\": str},\n",
    "        \"feed_generator\": {\"cid_entry\":str, \"did\": str, \"createdAt\": str, \"description\": str, \"displayName\": str,\n",
    "                        \"skyfeedBuilder_id_input\": str, \"skyfeedBuilder_id_regex\": str,\n",
    "                        \"skyfeedBuilder_id_remove\": str, \"skyfeedBuilder_id_sort\": str,\n",
    "                        \"skyfeedBuilder_value_regex\": str, \"skyfeedBuilder_sort_type\": str},\n",
    "        \"posts\": {\"cid_entry\":str, \"text\": str, \"langs\": str, \"createdAt\": str, \"did\": str, \"has_image\": bool,\n",
    "                \"link\": str, \"quote_cid\": str, \"quote_uri\": str, \"quote_did\": str, \"reply_root_cid\": str,\n",
    "                \"reply_root_uri\": str, \"reply_root_did\": str, \"reply_parent_cid\": str, \"reply_parent_uri\": str,\n",
    "                \"reply_parent_did\": str}\n",
    "    }\n",
    "\n",
    "    # Get the expected data types for the current table\n",
    "    table_dtypes = expected_dtypes[table_name]\n",
    "\n",
    "    # Check if the DataFrame has the expected columns\n",
    "    missing_columns = set(table_dtypes.keys()) - set(data.columns)\n",
    "    if missing_columns:\n",
    "        raise ValueError(f\"Missing columns in DataFrame for table '{table_name}': {missing_columns}\")\n",
    "    \n",
    "    # Convert data types of the DataFrame columns to match the expected types\n",
    "    for column, dtype in table_dtypes.items():\n",
    "        data[column] = data[column].astype(dtype)\n",
    "    \n",
    "    data.replace({'None': None}, inplace=True)\n",
    "\n",
    "    # Upload the filtered data to the SQL table\n",
    "    columns = data.columns.tolist()\n",
    "    query = f\"INSERT INTO {table_name} ({','.join(columns)}) VALUES ({','.join(['%s'] * len(columns))})\"\n",
    "    values = [tuple(row) for _, row in data.iterrows()]\n",
    "    with conn.cursor() as cursor:\n",
    "        try:\n",
    "            cursor.executemany(query, values)\n",
    "            conn.commit()\n",
    "        except Exception as e:\n",
    "            conn.rollback()\n",
    "            print(f\"Error uploading data to {table_name}: {str(e)}\")\n",
    "\n",
    "\n",
    "def upload_data_to_sql(conn, data, verbose=True):\n",
    "    c = conn.cursor()\n",
    "\n",
    "    # Execute the queries to create the tables if they don't exist\n",
    "    for query in queries:\n",
    "        c.execute(query)\n",
    "    conn.commit()\n",
    "\n",
    "    # Define a lambda function to convert 'NaT' to None\n",
    "    convert_nat_to_none = lambda df: df.apply(lambda x: x.map(lambda y: None if pd.isnull(y) else (None if pd.isnull(y).all() else y) if hasattr(y, 'all') else y))\n",
    "\n",
    "\n",
    "    # List of data preparation functions\n",
    "    data_preparation_functions = [\n",
    "        posts, profiles, follows, likes, reposts, blocks, listblock,\n",
    "        create_list, listitem, threadgate, feed_generator\n",
    "    ]\n",
    "\n",
    "    # Table names corresponding to each data preparation function\n",
    "    table_names = [\n",
    "        \"posts\", \"profiles\", \"follows\", \"likes\", \"reposts\", \"blocks\", \"listblock\",\n",
    "        \"create_list\", \"listitem\", \"threadgate\", \"feed_generator\"\n",
    "    ]\n",
    "\n",
    "    # Iterate over the preparation functions and table names together\n",
    "    for prep_func, table_name in zip(data_preparation_functions, table_names):\n",
    "        try:\n",
    "            df = prep_func(data)\n",
    "            upload_data_to_table(df, table_name, conn)\n",
    "        except Exception as e:\n",
    "            print(f\"Error uploading data to {table_name}: {str(e)}\") # We dont check whether a key is present.\n",
    "\n",
    "    if verbose:\n",
    "        print(\"Data uploaded to SQL\")\n",
    "    conn.commit()\n",
    "\n",
    "def create_data_dict(file):\n",
    "    print(f\"Processing {file}\")\n",
    "    path = f\"../../get_repo_temp/Data/full_download/{file}\"\n",
    "    try:\n",
    "        with gzip.open(path , \"rt\") as f:\n",
    "            data = json.load(f) \n",
    "        sorted_entries = {}\n",
    "        for user in data:\n",
    "            if not data[user]:\n",
    "                continue\n",
    "            for entry in data[user]:\n",
    "                if not isinstance(entry[\"$type\"], str):\n",
    "                    continue\n",
    "                if entry[\"$type\"] not in sorted_entries:\n",
    "                    sorted_entries[entry[\"$type\"]] = []\n",
    "                save_entry = entry\n",
    "                # remove $type\n",
    "                save_entry[\"did\"] = user\n",
    "                sorted_entries[entry[\"$type\"]].append(save_entry)\n",
    "    except Exception as e:\n",
    "        print(\"#\"*50)\n",
    "        print(f\"\\033[91mError reading or Processing file {file}: {str(e)}\\033[0m\")\n",
    "        print(\"#\"*50)\n",
    "        return None\n",
    "    return sorted_entries\n",
    "\n",
    "def process_and_upload_file(file):\n",
    "    data = create_data_dict(file)\n",
    "    if data is None:\n",
    "        return\n",
    "    # Connect to the database\n",
    "    conn = psycopg2.connect(\n",
    "        dbname=\"xxx\",\n",
    "        user=\"xxx\",\n",
    "        password=\"xxx\",\n",
    "        host=\"xxx\"\n",
    "    )\n",
    "\n",
    "    # Upload the data to the database\n",
    "    upload_data_to_sql(conn, data, verbose=False)\n",
    "    conn.close()\n",
    "\n",
    "    # Append the processed file to the text file\n",
    "    with open(\"../processed_files.txt\", \"a\") as f:\n",
    "        f.write(f\"{file}\\n\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from data_processing import process_and_upload_file\n",
    "from multiprocessing import Pool\n",
    "from tqdm import tqdm\n",
    "import os\n",
    "\n",
    "def main():\n",
    "    # Read the processed files from the text file\n",
    "    processed_files = set()\n",
    "    if os.path.exists(\"../processed_files.txt\"):\n",
    "        with open(\"../processed_files.txt\", \"r\") as file:\n",
    "            processed_files = set(file.read().splitlines())\n",
    "    path = \"../../get_repo_temp/Data/full_download/\"\n",
    "    batches = os.listdir(path)\n",
    "    batches = [x for x in batches if x.startswith(\"batch\")]\n",
    "    print(f\"Processing {len(batches)} batches\")\n",
    "    batches = [x for x in batches if x not in processed_files]\n",
    "    print(f\"Processing {len(batches)} batches after filtering processed files\")\n",
    "\n",
    "    with Pool(processes=8) as pool:\n",
    "        list(tqdm(pool.imap(process_and_upload_file, batches), total=len(batches)))\n",
    "\n",
    "if __name__ == \"__main__\":\n",
    "    main()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
